#include "rpc_channel.hpp"

#include <functional>

#include <google/protobuf/descriptor.h>

#include "../base/logger.h"
#include "rpc.pb.h"

namespace easyasio
{
namespace net
{

RpcChannel::RpcChannel()
  : codec_(std::bind(&RpcChannel::onRpcMessage, this, std::placeholders::_1, std::placeholders::_2))
{
    LOG_INFO("RpcChannel::ctor - {}", (void*)this);
}

RpcChannel::RpcChannel(const TcpConnectionPtr& conn)
  : codec_(std::bind(&RpcChannel::onRpcMessage, this, std::placeholders::_1, std::placeholders::_2)),
    conn_(conn)
{
    LOG_INFO("RpcChannel::ctor - {}", (void*)this);
}

RpcChannel::~RpcChannel()
{
    LOG_INFO("RpcChannel::dtor - ", (void*)this);
}

void RpcChannel::CallMethod(const ::google::protobuf::MethodDescriptor* method,
                  google::protobuf::RpcController* controller,
                  const ::google::protobuf::Message* request,
                  ::google::protobuf::Message* response,
                  ::google::protobuf::Closure* done)
{
    RpcMessage message;
    message.set_type(MSGTYPE_REQUEST);
    int64_t id = id_++;
    message.set_id(id);
    message.set_service(method->service()->name());
    message.set_method(method->name());
    message.set_request(request->SerializeAsString()); // FIXME: error check
    RpcCodec::send(conn_, message);

    OutstandingCall out = { response, done };
    std::lock_guard<std::mutex> gard(mutex_);
    outstandings_[id] = out;
}

void RpcChannel::onMessage(const TcpConnectionPtr& conn, Buffer* buf)
{
    codec_.onMessage(conn, buf);
}

void RpcChannel::onRpcMessage(const TcpConnectionPtr& conn, const RpcMessage& message)
{
    assert(conn == conn_);
    //printf("%s\n", message.DebugString().c_str());
    if (message.type() == MSGTYPE_RESPONSE)
    {
        int64_t id = message.id();
        assert(message.has_response());

        OutstandingCall out = { NULL, NULL };

        {
            std::lock_guard<std::mutex> gard(mutex_);
            std::map<int64_t, OutstandingCall>::iterator it = outstandings_.find(id);
            if (it != outstandings_.end())
            {
                out = it->second;
                outstandings_.erase(it);
            }
        }

        if (out.response)
        {
            out.response->ParseFromString(message.response());
            if (out.done)
            {
                out.done->Run();
            }
            delete out.response;
        }
    }
    else if (message.type() == MSGTYPE_REQUEST)
    {
        // FIXME: extract to a function
        if (services_)
        {
            std::map<std::string, google::protobuf::Service*>::const_iterator it = services_->find(message.service());
            if (it != services_->end())
            {
                google::protobuf::Service* service = it->second;
                assert(service != NULL);
                const google::protobuf::ServiceDescriptor* desc = service->GetDescriptor();
                const google::protobuf::MethodDescriptor* method
                  = desc->FindMethodByName(message.method());
                if (method)
                {
                    google::protobuf::Message* request = service->GetRequestPrototype(method).New();
                    request->ParseFromString(message.request());
                    google::protobuf::Message* response = service->GetResponsePrototype(method).New();
                    int64_t id = message.id();
                    service->CallMethod(method, NULL, request, response,
                      NewCallback(this, &RpcChannel::doneCallback, response, id));
                    delete request;
                }
                else
                {
                  // FIXME:
                }
          }//if (it != services_->end())
          else
          {
            // FIXME:
          }
        }
        else
        {
          // FIXME:
        }
    }
    else if (message.type() == ERROR)
    {
    }
}

void RpcChannel::doneCallback(::google::protobuf::Message* response, int64_t id)
{
    RpcMessage message;
    message.set_type(MSGTYPE_RESPONSE);
    message.set_id(id);
    message.set_response(response->SerializeAsString()); // FIXME: error check
    RpcCodec::send(conn_, message);
    delete response;
}

}
}